Skip to main content

Calling LLM APIs

Reading time: ~40 minutes | Level: Advanced

The Incident

It is 2:47 AM. Your LLM-powered document analysis service has been running in production for three weeks. A law firm is using it to review contracts overnight before a 9 AM merger close. You wake up to 847 Slack notifications.

The service has been returning None for every document since midnight. You trace the logs:

2024-01-15 00:03:41 INFO Processing document 1/312
2024-01-15 00:03:44 ERROR openai.RateLimitError: Rate limit reached for gpt-4o
2024-01-15 00:03:44 INFO Processing document 2/312
2024-01-15 00:03:44 ERROR openai.RateLimitError: Rate limit reached for gpt-4o
...
2024-01-15 00:03:44 INFO Processing document 312/312
2024-01-15 00:03:44 ERROR openai.RateLimitError: Rate limit reached for gpt-4o

Your code hit a rate limit, caught the exception, returned None, and moved on. Every single document was processed in 3 milliseconds and returned no content. The retry logic? Never implemented. The rate limit backoff? Never implemented. The cost of this outage: one very unhappy client and a very awake engineer.

This lesson teaches you to never be that engineer.

What You Will Learn

  • The structure of Anthropic and OpenAI Python SDKs
  • Messages API: system/user/assistant roles, multi-turn conversations
  • Sync vs async clients and when each is appropriate
  • Retry with exponential backoff using tenacity and httpx
  • Rate limiting: tokens-per-minute, requests-per-minute
  • Batch processing patterns for many requests
  • Cost estimation before API calls
  • Structured output with JSON mode and response_format
  • Every error type and how to handle each one
  • Logging and observability for LLM calls in production

Part 1 -- SDK Setup

Anthropic

import anthropic
import os

# The client reads ANTHROPIC_API_KEY from the environment automatically.
# Passing the key explicitly is fine for testing but avoid it in production.
client = anthropic.Anthropic()

# Or, pass the key explicitly:
client = anthropic.Anthropic(api_key=os.environ["ANTHROPIC_API_KEY"])

# Configure timeouts. Default is 10 minutes, which is too long for most uses.
# timeout is in seconds. Set connect and read separately.
client = anthropic.Anthropic(
timeout=anthropic.Timeout(
connect=5.0, # fail fast if the connection itself fails
read=120.0, # allow up to 2 minutes for the response body
write=10.0,
pool=5.0,
)
)

OpenAI

import openai
import httpx

# Same pattern: reads OPENAI_API_KEY from environment
client = openai.OpenAI()

# With explicit configuration
client = openai.OpenAI(
api_key=os.environ["OPENAI_API_KEY"],
timeout=httpx.Timeout(connect=5.0, read=120.0, write=10.0, pool=5.0),
max_retries=0, # We'll handle retries ourselves with tenacity
)
tip

Set max_retries=0 on the OpenAI client and handle retries yourself with tenacity. The built-in retry uses a fixed backoff; tenacity gives you full control. Anthropic's SDK does not expose a max_retries parameter the same way, but you can wrap calls identically.

Part 2 -- The Messages API

Both Anthropic and OpenAI use the Messages API (or Chat Completions API). The structure is identical in concept:

messages = [
# System message sets the model's behavior. Anthropic and OpenAI handle
# this differently in their APIs, but the concept is the same.
{"role": "system", "content": "You are a senior contract lawyer..."},

# User/assistant pairs form the conversation history.
{"role": "user", "content": "Summarize this NDA."},
{"role": "assistant", "content": "This NDA has three key provisions..."},

# The current user message is always last.
{"role": "user", "content": "What is the termination clause?"},
]

Anthropic API Shape

Anthropic separates system from messages. The system prompt is a top-level parameter:

import anthropic

client = anthropic.Anthropic()

message = client.messages.create(
model="claude-opus-4-5",
max_tokens=1024,
system="You are a senior contract lawyer specializing in IP law.", # top-level
messages=[
{"role": "user", "content": "Summarize this NDA: [NDA text here]"},
{"role": "assistant", "content": "This NDA contains three key provisions..."},
{"role": "user", "content": "What is the termination clause?"},
],
)

# Access the response
text = message.content[0].text
print(text)

# Usage metadata -- you need this for cost tracking
print(message.usage.input_tokens) # tokens in the messages + system
print(message.usage.output_tokens) # tokens in the response
print(message.stop_reason) # "end_turn" | "max_tokens" | "tool_use"

OpenAI API Shape

OpenAI includes the system prompt as the first message in the messages list:

import openai

client = openai.OpenAI()

completion = client.chat.completions.create(
model="gpt-4o",
max_tokens=1024,
messages=[
{"role": "system", "content": "You are a senior contract lawyer..."},
{"role": "user", "content": "Summarize this NDA: [NDA text here]"},
{"role": "assistant", "content": "This NDA contains three key provisions..."},
{"role": "user", "content": "What is the termination clause?"},
],
)

text = completion.choices[0].message.content
print(completion.usage.prompt_tokens) # input tokens
print(completion.usage.completion_tokens) # output tokens
print(completion.choices[0].finish_reason) # "stop" | "length" | "tool_calls"

Building a Provider-Agnostic Client

In production, you often want to switch between providers or fall back from one to another. A thin adapter layer achieves this without leaking provider-specific code throughout your codebase:

from dataclasses import dataclass, field
from typing import Protocol
import anthropic
import openai


@dataclass
class Message:
role: str # "user" | "assistant"
content: str


@dataclass
class LLMResponse:
content: str
input_tokens: int
output_tokens: int
stop_reason: str
model: str


class LLMClient(Protocol):
"""Protocol any LLM client must satisfy."""

def complete(
self,
messages: list[Message],
*,
system: str = "",
max_tokens: int = 1024,
temperature: float = 0.7,
) -> LLMResponse: ...


class AnthropicClient:
def __init__(self, model: str = "claude-opus-4-5") -> None:
self._client = anthropic.Anthropic()
self._model = model

def complete(
self,
messages: list[Message],
*,
system: str = "",
max_tokens: int = 1024,
temperature: float = 0.7,
) -> LLMResponse:
resp = self._client.messages.create(
model=self._model,
max_tokens=max_tokens,
temperature=temperature,
system=system,
messages=[{"role": m.role, "content": m.content} for m in messages],
)
return LLMResponse(
content=resp.content[0].text,
input_tokens=resp.usage.input_tokens,
output_tokens=resp.usage.output_tokens,
stop_reason=resp.stop_reason,
model=self._model,
)


class OpenAIClientAdapter:
def __init__(self, model: str = "gpt-4o") -> None:
self._client = openai.OpenAI()
self._model = model

def complete(
self,
messages: list[Message],
*,
system: str = "",
max_tokens: int = 1024,
temperature: float = 0.7,
) -> LLMResponse:
all_messages = []
if system:
all_messages.append({"role": "system", "content": system})
all_messages.extend({"role": m.role, "content": m.content} for m in messages)

resp = self._client.chat.completions.create(
model=self._model,
max_tokens=max_tokens,
temperature=temperature,
messages=all_messages,
)
return LLMResponse(
content=resp.choices[0].message.content or "",
input_tokens=resp.usage.prompt_tokens,
output_tokens=resp.usage.completion_tokens,
stop_reason=resp.choices[0].finish_reason,
model=self._model,
)

Part 3 -- Sync vs Async Clients

Both SDKs provide synchronous and asynchronous clients. Choosing the wrong one for your context has real consequences.

When to Use Sync

  • CLI tools, scripts, Jupyter notebooks
  • Single-request handlers where you do not need concurrency
  • Background worker processes that are already single-threaded
# Sync is fine for a script that processes one document at a time
client = anthropic.Anthropic()
result = client.messages.create(...) # Blocks until complete

When to Use Async

  • FastAPI / Starlette handlers (always async)
  • Any code that processes multiple requests concurrently
  • Streaming endpoints
  • Any code that already uses asyncio
import asyncio
import anthropic

async def analyze_document(doc_text: str) -> str:
# AsyncAnthropic is the async counterpart of Anthropic
client = anthropic.AsyncAnthropic()
message = await client.messages.create(
model="claude-opus-4-5",
max_tokens=1024,
messages=[{"role": "user", "content": f"Summarize: {doc_text}"}],
)
return message.content[0].text


async def analyze_many(documents: list[str]) -> list[str]:
# Process all documents concurrently instead of sequentially.
# For 10 documents each taking 3 seconds, this takes ~3 seconds total
# instead of 30 seconds. But be careful: this will hit rate limits fast.
tasks = [analyze_document(doc) for doc in documents]
return await asyncio.gather(*tasks)
danger

Never call a synchronous SDK client from inside an async function without running it in a thread executor. Calling anthropic.Anthropic().messages.create() (sync) from inside async def blocks the event loop for the entire duration of the network call -- potentially seconds. Every other async task in your application freezes.

# Wrong: blocks the event loop
async def bad_handler():
sync_client = anthropic.Anthropic()
return sync_client.messages.create(...) # DO NOT DO THIS

# Right: use the async client
async def good_handler():
async_client = anthropic.AsyncAnthropic()
return await async_client.messages.create(...)

# Also right: run sync in a thread pool if you must use sync
import asyncio
async def acceptable_handler():
sync_client = anthropic.Anthropic()
loop = asyncio.get_event_loop()
return await loop.run_in_executor(None, lambda: sync_client.messages.create(...))

Part 4 -- Retry with Exponential Backoff

Rate limit errors (HTTP 429) are not failures. They are signals to slow down. Every production LLM client must implement retry with exponential backoff. Do not implement it manually; use tenacity.

from tenacity import (
retry,
stop_after_attempt,
wait_exponential,
retry_if_exception_type,
before_sleep_log,
)
import anthropic
import openai
import logging

logger = logging.getLogger(__name__)

# Define which exceptions should trigger a retry
RETRYABLE_ANTHROPIC = (
anthropic.RateLimitError,
anthropic.APIConnectionError,
anthropic.InternalServerError, # 500s from Anthropic
anthropic.APITimeoutError,
)

RETRYABLE_OPENAI = (
openai.RateLimitError,
openai.APIConnectionError,
openai.InternalServerError,
openai.APITimeoutError,
)


@retry(
retry=retry_if_exception_type(RETRYABLE_ANTHROPIC),
wait=wait_exponential(
multiplier=1,
min=2, # Start with 2 seconds
max=60, # Cap at 60 seconds
),
stop=stop_after_attempt(5),
before_sleep=before_sleep_log(logger, logging.WARNING),
reraise=True, # After all retries fail, reraise the original exception
)
def _call_anthropic_with_retry(client: anthropic.Anthropic, **kwargs):
"""Wraps the raw API call with retry logic."""
return client.messages.create(**kwargs)

The wait_exponential strategy waits:

  • Attempt 2: ~2 seconds
  • Attempt 3: ~4 seconds
  • Attempt 4: ~8 seconds
  • Attempt 5: ~16 seconds

This is critical: without jitter, many services retrying simultaneously will all wake up at the same time and hammer the API again. Add jitter:

from tenacity import wait_exponential_jitter

@retry(
retry=retry_if_exception_type(RETRYABLE_ANTHROPIC),
# wait_exponential_jitter adds random jitter: wait = min(exp, max) + random(0, jitter)
wait=wait_exponential_jitter(
initial=2,
max=60,
jitter=5, # Adds up to 5 seconds of random wait
),
stop=stop_after_attempt(5),
reraise=True,
)
async def _async_call_with_retry(client: anthropic.AsyncAnthropic, **kwargs):
return await client.messages.create(**kwargs)

Handling 429 Retry-After Headers

The Anthropic API returns a Retry-After header telling you exactly how long to wait. Respect it:

import time
from tenacity import retry, stop_after_attempt, wait_base


class RetryAfterWait(wait_base):
"""Wait strategy that reads the Retry-After header from the exception."""

def __call__(self, retry_state) -> float:
exc = retry_state.outcome.exception()

# Anthropic and OpenAI both attach response headers to the exception
retry_after = None
if hasattr(exc, "response") and exc.response is not None:
retry_after = exc.response.headers.get("retry-after")

if retry_after is not None:
# Retry-After can be seconds or an HTTP date string
try:
return float(retry_after) + 1.0 # Add 1 second buffer
except ValueError:
pass # It is a date string; fall back to default

# Default: exponential backoff
return min(2 ** retry_state.attempt_number, 60)


@retry(
retry=retry_if_exception_type(anthropic.RateLimitError),
wait=RetryAfterWait(),
stop=stop_after_attempt(5),
reraise=True,
)
async def smart_retry_call(client: anthropic.AsyncAnthropic, **kwargs):
return await client.messages.create(**kwargs)

Part 5 -- Rate Limiting

Even with retry backoff, you need proactive rate limiting. The Anthropic API limits are per-workspace per-minute:

  • Requests per minute (RPM)
  • Tokens per minute (TPM) -- input + output combined
  • Tokens per day (TPD)

A token bucket is the right data structure. It fills at a constant rate and depletes with each request:

import asyncio
import time
from dataclasses import dataclass, field


@dataclass
class TokenBucket:
"""
Rate limiter using the token bucket algorithm.
rate: tokens added per second
capacity: maximum tokens in the bucket
"""
rate: float # tokens/second
capacity: float # max bucket size

_tokens: float = field(init=False)
_last_refill: float = field(init=False)
_lock: asyncio.Lock = field(init=False)

def __post_init__(self):
self._tokens = self.capacity # Start full
self._last_refill = time.monotonic()
self._lock = asyncio.Lock()

def _refill(self) -> None:
"""Add tokens based on elapsed time since last refill."""
now = time.monotonic()
elapsed = now - self._last_refill
# Tokens earned = rate (tokens/sec) * elapsed (sec)
self._tokens = min(self.capacity, self._tokens + elapsed * self.rate)
self._last_refill = now

async def consume(self, tokens: float = 1.0) -> None:
"""
Wait until enough tokens are available, then consume them.
Blocks the caller if the bucket is empty.
"""
async with self._lock:
self._refill()
if self._tokens >= tokens:
self._tokens -= tokens
return

# Not enough tokens -- calculate how long to wait.
deficit = tokens - self._tokens
wait_time = deficit / self.rate
# Release the lock while waiting so others can check the bucket
await asyncio.sleep(wait_time)
async with self._lock:
self._refill()
self._tokens -= tokens


# Example limits for Claude claude-opus-4-5 tier 1 (check Anthropic docs for your tier)
CLAUDE_TOKEN_LIMITER = TokenBucket(
rate=40_000 / 60, # 40K tokens per minute -> tokens/second
capacity=40_000,
)
CLAUDE_REQUEST_LIMITER = TokenBucket(
rate=50 / 60, # 50 requests per minute -> requests/second
capacity=50,
)


async def rate_limited_complete(
client: anthropic.AsyncAnthropic,
messages: list[dict],
estimated_tokens: int,
**kwargs,
) -> anthropic.Message:
"""
Call the API only after acquiring rate limit budget.
estimated_tokens: tiktoken estimate of input + expected output tokens
"""
# Consume from both buckets. This will block if we are over limit.
# Both checks run concurrently -- the slower one gates the request.
await asyncio.gather(
CLAUDE_REQUEST_LIMITER.consume(1),
CLAUDE_TOKEN_LIMITER.consume(estimated_tokens),
)
return await client.messages.create(messages=messages, **kwargs)

Part 6 -- Batch Processing Patterns

When you have hundreds of documents to process, naive sequential processing is too slow and naive concurrent processing hits rate limits. The right pattern is a bounded semaphore:

import asyncio
import anthropic
from tqdm.asyncio import tqdm_asyncio # pip install tqdm


async def process_batch(
documents: list[str],
*,
max_concurrent: int = 5, # Never fire more than 5 requests simultaneously
model: str = "claude-opus-4-5",
) -> list[str | Exception]:
"""
Process many documents concurrently with bounded parallelism.
Returns results in the same order as input, even if processed out of order.
Errors are returned as Exception objects, not raised, so one failure
does not abort the entire batch.
"""
client = anthropic.AsyncAnthropic()
# Semaphore limits simultaneous in-flight requests
semaphore = asyncio.Semaphore(max_concurrent)

async def process_one(doc: str, idx: int) -> tuple[int, str | Exception]:
async with semaphore: # Blocks if max_concurrent are already running
try:
msg = await client.messages.create(
model=model,
max_tokens=512,
messages=[{"role": "user", "content": f"Summarize: {doc}"}],
)
return idx, msg.content[0].text
except Exception as e:
# Return the exception instead of raising.
# The caller decides whether to retry or skip.
return idx, e

tasks = [process_one(doc, i) for i, doc in enumerate(documents)]
# tqdm_asyncio.gather shows a progress bar as tasks complete
results_unordered = await tqdm_asyncio.gather(*tasks)

# Reorder results to match input order
results: list[str | Exception | None] = [None] * len(documents)
for idx, result in results_unordered:
results[idx] = result

return results # type: ignore[return-value]


async def main():
docs = ["Contract A text...", "Contract B text...", "Contract C text..."]
results = await process_batch(docs, max_concurrent=3)

for i, result in enumerate(results):
if isinstance(result, Exception):
print(f"Document {i} failed: {result}")
else:
print(f"Document {i}: {result[:100]}...")

For very large batches (thousands of documents), use Anthropic's native batch API:

async def submit_message_batch(
prompts: list[str],
model: str = "claude-opus-4-5",
) -> str:
"""
Submit a batch of requests using Anthropic's batch API.
Cheaper than individual requests. Results available in minutes to hours.
Returns the batch ID for later retrieval.
"""
client = anthropic.AsyncAnthropic()

requests = [
{
"custom_id": f"request-{i}",
"params": {
"model": model,
"max_tokens": 512,
"messages": [{"role": "user", "content": p}],
},
}
for i, p in enumerate(prompts)
]

batch = await client.messages.batches.create(requests=requests)
print(f"Batch submitted: {batch.id}")
print(f"Status: {batch.processing_status}")
return batch.id


async def poll_batch_until_done(batch_id: str) -> list[dict]:
"""Poll a batch until complete and return results."""
client = anthropic.AsyncAnthropic()

while True:
batch = await client.messages.batches.retrieve(batch_id)
if batch.processing_status == "ended":
break
print(f"Waiting... {batch.request_counts}")
await asyncio.sleep(60) # Check every minute

results = []
async for result in await client.messages.batches.results(batch_id):
if result.result.type == "succeeded":
results.append({
"id": result.custom_id,
"content": result.result.message.content[0].text,
})
else:
results.append({
"id": result.custom_id,
"error": result.result.error.type,
})
return results

Part 7 -- Cost Estimation Before Calls

Spending $3.00 per 1M input tokens sounds cheap until you have a 50K-token prompt and process 10,000 documents. Calculate cost before every call in production:

import tiktoken
from dataclasses import dataclass

# Pricing per million tokens (as of early 2025; always check current prices)
PRICING: dict[str, dict[str, float]] = {
"gpt-4o": {"input": 2.50, "output": 10.00},
"gpt-4o-mini": {"input": 0.15, "output": 0.60},
"claude-opus-4-5": {"input": 3.00, "output": 15.00},
"claude-sonnet-4-6": {"input": 3.00, "output": 15.00},
"claude-haiku-3-5": {"input": 0.80, "output": 4.00},
}


@dataclass
class CostEstimate:
model: str
input_tokens: int
output_tokens: int
input_cost_usd: float
output_cost_usd: float
total_cost_usd: float


def estimate_cost(
messages: list[dict],
model: str,
max_output_tokens: int,
) -> CostEstimate:
"""
Estimate the cost of an API call before making it.
Uses tiktoken for token counting (exact for OpenAI, approximate for Anthropic).
"""
# Use the closest OpenAI tokenizer for non-OpenAI models
# (Anthropic uses a similar BPE tokenizer; counts will be close but not exact)
try:
enc = tiktoken.encoding_for_model(model)
except KeyError:
enc = tiktoken.get_encoding("cl100k_base") # Fallback

# Count tokens in all messages
input_tokens = 0
for msg in messages:
content = msg.get("content", "")
if isinstance(content, str):
input_tokens += len(enc.encode(content))
input_tokens += 4 # Role tokens + message overhead

pricing = PRICING.get(model, {"input": 5.00, "output": 20.00})
input_cost = (input_tokens / 1_000_000) * pricing["input"]
output_cost = (max_output_tokens / 1_000_000) * pricing["output"]

return CostEstimate(
model=model,
input_tokens=input_tokens,
output_tokens=max_output_tokens, # Estimate; actual may be less
input_cost_usd=input_cost,
output_cost_usd=output_cost,
total_cost_usd=input_cost + output_cost,
)


def guard_cost(estimate: CostEstimate, limit_usd: float = 0.10) -> None:
"""Raise if a single call exceeds the cost limit."""
if estimate.total_cost_usd > limit_usd:
raise ValueError(
f"Estimated cost ${estimate.total_cost_usd:.4f} exceeds limit "
f"${limit_usd:.4f} for model {estimate.model}. "
f"Input tokens: {estimate.input_tokens}, "
f"Max output: {estimate.output_tokens}"
)


# Usage example
def checked_call(client, messages: list[dict], model: str, max_tokens: int) -> str:
estimate = estimate_cost(messages, model, max_tokens)
guard_cost(estimate, limit_usd=0.05) # Abort if > $0.05 per call
print(f"Estimated cost: ${estimate.total_cost_usd:.4f}")
response = client.messages.create(
model=model, max_tokens=max_tokens, messages=messages
)
return response.content[0].text

Part 8 -- Structured Output

LLMs output text. You need structured data. Two approaches: JSON mode (model outputs JSON) and structured prompting (model follows a schema described in the prompt).

OpenAI JSON Mode

import json
import openai
from pydantic import BaseModel

client = openai.OpenAI()


class ContractSummary(BaseModel):
parties: list[str]
effective_date: str
termination_clause: str
key_obligations: list[str]
governing_law: str


def extract_contract_info(contract_text: str) -> ContractSummary:
"""Extract structured data from a contract using JSON mode."""
response = client.chat.completions.create(
model="gpt-4o",
response_format={"type": "json_object"}, # Guarantees valid JSON output
messages=[
{
"role": "system",
"content": (
"Extract contract information and return as JSON with these fields: "
"parties (list of party names), effective_date (ISO 8601 string), "
"termination_clause (string description), "
"key_obligations (list of strings), governing_law (string). "
"Return only valid JSON, no other text."
),
},
{"role": "user", "content": contract_text},
],
max_tokens=1024,
)

raw_json = response.choices[0].message.content
data = json.loads(raw_json)
return ContractSummary(**data)

OpenAI Structured Outputs (Strict Mode)

GPT-4o supports strict schema enforcement -- the output is guaranteed to match your Pydantic model exactly:

from openai import OpenAI
from pydantic import BaseModel

client = OpenAI()


class AnalysisStep(BaseModel):
explanation: str
output: str


class ContractAnalysis(BaseModel):
steps: list[AnalysisStep]
final_answer: str


completion = client.beta.chat.completions.parse(
model="gpt-4o-2024-08-06",
messages=[
{"role": "system", "content": "Analyze the contract step by step."},
{"role": "user", "content": "...contract text..."},
],
response_format=ContractAnalysis, # Pydantic model enforced server-side
)

# .parsed is already a ContractAnalysis instance -- no json.loads() needed
analysis = completion.choices[0].message.parsed
print(analysis.final_answer)
for step in analysis.steps:
print(f" - {step.explanation}")

Anthropic Structured Output via XML Tags

Anthropic does not have a native JSON mode. XML tags are more reliable than asking for raw JSON, because the model learned to close XML tags properly during training:

import re
import xml.etree.ElementTree as ET
import anthropic

client = anthropic.Anthropic()


def extract_with_xml_tags(contract_text: str) -> dict:
"""Use XML tags to get structured output from Claude."""
response = client.messages.create(
model="claude-opus-4-5",
max_tokens=1024,
system=(
"You extract structured information from legal documents. "
"Always respond using the exact XML format requested."
),
messages=[
{
"role": "user",
"content": f"""Analyze this contract and extract key information.

{contract_text}

Respond in this exact XML format:
<analysis>
<parties>
<party>Party Name 1</party>
<party>Party Name 2</party>
</parties>
<effective_date>YYYY-MM-DD</effective_date>
<termination_clause>Description here</termination_clause>
<governing_law>State/Country</governing_law>
</analysis>""",
}
],
)

# Extract XML from response (model may include preamble text)
text = response.content[0].text
xml_match = re.search(r"<analysis>.*?</analysis>", text, re.DOTALL)
if not xml_match:
raise ValueError(f"No XML found in response: {text[:200]}")

root = ET.fromstring(xml_match.group())
parties = [p.text for p in root.findall(".//party") if p.text]

return {
"parties": parties,
"effective_date": root.findtext("effective_date", ""),
"termination_clause": root.findtext("termination_clause", ""),
"governing_law": root.findtext("governing_law", ""),
}

Part 9 -- Error Types and Handling

Every error type requires a different response. Never catch a bare Exception and move on silently.

import anthropic
import logging

logger = logging.getLogger(__name__)


def safe_complete(client: anthropic.Anthropic, **kwargs) -> str | None:
"""
Complete with comprehensive error handling.
Returns None for recoverable-but-exhausted errors.
Re-raises for programmer errors that require code fixes.
"""
try:
response = client.messages.create(**kwargs)
return response.content[0].text

except anthropic.AuthenticationError as e:
# Wrong or expired API key. This is a configuration error.
# Log it and alert -- this will not fix itself with retries.
logger.critical("Anthropic authentication failed: %s", e)
raise # Re-raise: this is a programmer/config error

except anthropic.PermissionDeniedError as e:
# Your key does not have access to this model or feature.
logger.error("Permission denied for Anthropic API: %s", e)
raise

except anthropic.NotFoundError as e:
# Model name typo or model was deprecated.
logger.error("Anthropic resource not found: %s", e)
raise

except anthropic.UnprocessableEntityError as e:
# The request is valid JSON but semantically wrong.
# Common cause: empty messages list, content policy violation.
logger.error("Invalid request to Anthropic API: %s", e)
raise

except anthropic.BadRequestError as e:
# HTTP 400: malformed request body. This is a programmer error.
logger.error("Bad request to Anthropic API (fix the code): %s", e)
raise

except anthropic.RateLimitError as e:
# Too many requests or too many tokens. Should be caught by retry
# decorator first, but catch as last resort.
retry_after = getattr(e.response, "headers", {}).get("retry-after", "unknown")
logger.warning("Anthropic rate limit hit. Retry-After: %s", retry_after)
return None

except anthropic.InternalServerError as e:
# Anthropic server error. Usually transient.
logger.error("Anthropic server error (HTTP %s): %s", e.status_code, e)
return None

except anthropic.APIConnectionError as e:
# Network error: DNS failure, connection refused, timeout.
logger.error("Network error connecting to Anthropic: %s", e)
return None

except anthropic.APITimeoutError as e:
# Request took longer than the client timeout.
logger.warning("Anthropic request timed out: %s", e)
return None

Error Taxonomy

Orange errors are transient and should be retried. Red errors are permanent and require code or configuration changes.

Part 10 -- Logging and Observability

Logging LLM calls in production requires capturing more than errors. You need input/output, latency, token usage, and cost per request:

import time
import uuid
import logging
import json
from contextlib import contextmanager
from dataclasses import dataclass, field, asdict
from typing import Any, Generator

logger = logging.getLogger("llm.calls")


@dataclass
class LLMCallLog:
call_id: str = field(default_factory=lambda: str(uuid.uuid4()))
model: str = ""
input_tokens: int = 0
output_tokens: int = 0
latency_ms: float = 0.0
cost_usd: float = 0.0
stop_reason: str = ""
error: str | None = None
# Truncated previews for debugging (always sanitize PII before logging!)
input_preview: str = "" # First 200 chars of last user message
output_preview: str = "" # First 200 chars of response
metadata: dict[str, Any] = field(default_factory=dict)


@contextmanager
def log_llm_call(
model: str,
messages: list[dict],
**metadata: Any,
) -> Generator[LLMCallLog, None, None]:
"""
Context manager that logs an LLM call with timing, tokens, and cost.

Usage:
with log_llm_call("claude-opus-4-5", messages, user_id="u123") as log:
response = client.messages.create(...)
log.input_tokens = response.usage.input_tokens
log.output_tokens = response.usage.output_tokens
log.output_preview = response.content[0].text[:200]
"""
last_user_msg = ""
for m in reversed(messages):
if m.get("role") == "user":
content = m.get("content", "")
last_user_msg = content if isinstance(content, str) else str(content)
break

call_log = LLMCallLog(
model=model,
input_preview=last_user_msg[:200],
metadata=metadata,
)
start = time.perf_counter()

try:
yield call_log
except Exception as e:
call_log.error = f"{type(e).__name__}: {e}"
raise
finally:
call_log.latency_ms = (time.perf_counter() - start) * 1000

# Calculate cost if we have token counts
if call_log.input_tokens > 0:
pricing = PRICING.get(model, {"input": 5.00, "output": 20.00})
call_log.cost_usd = (
(call_log.input_tokens / 1_000_000) * pricing["input"] +
(call_log.output_tokens / 1_000_000) * pricing["output"]
)

# Emit as structured JSON for log aggregators (Datadog, Loki, etc.)
logger.info(json.dumps(asdict(call_log)))


# Usage in an async handler
async def summarize_with_logging(text: str, user_id: str) -> str:
client = anthropic.AsyncAnthropic()
messages = [{"role": "user", "content": f"Summarize: {text}"}]

with log_llm_call("claude-opus-4-5", messages, user_id=user_id) as log:
response = await client.messages.create(
model="claude-opus-4-5",
max_tokens=512,
messages=messages,
)
log.input_tokens = response.usage.input_tokens
log.output_tokens = response.usage.output_tokens
log.stop_reason = response.stop_reason
output_text = response.content[0].text
log.output_preview = output_text[:200]

return output_text

For production, pipe these JSON logs to a log aggregation service (Datadog, Grafana Loki, CloudWatch). Build dashboards for:

  • Average tokens per call (detects prompt bloat over time)
  • P95/P99 latency (detects slowdowns before they become outages)
  • Error rate by error type (separates transient from permanent failures)
  • Cost per user / per endpoint / per day (catches runaway spend)

Putting It Together: A Production-Ready Client

import anthropic
import asyncio
import logging
from tenacity import (
retry,
stop_after_attempt,
wait_exponential_jitter,
retry_if_exception_type,
)

logger = logging.getLogger(__name__)

RETRYABLE = (
anthropic.RateLimitError,
anthropic.APIConnectionError,
anthropic.InternalServerError,
anthropic.APITimeoutError,
)


class ProductionAnthropicClient:
"""
A production-ready Anthropic client with:
- Retry with jittered exponential backoff
- Cost estimation and guard rails
- Structured logging
- Full async support
"""

def __init__(
self,
model: str = "claude-opus-4-5",
max_cost_per_call: float = 0.10,
) -> None:
self._client = anthropic.AsyncAnthropic(
timeout=anthropic.Timeout(connect=5.0, read=120.0, write=10.0, pool=5.0)
)
self._model = model
self._max_cost = max_cost_per_call

@retry(
retry=retry_if_exception_type(RETRYABLE),
wait=wait_exponential_jitter(initial=2, max=60, jitter=5),
stop=stop_after_attempt(5),
reraise=True,
)
async def complete(
self,
messages: list[dict],
*,
system: str = "",
max_tokens: int = 1024,
temperature: float = 0.7,
**metadata: Any,
) -> str:
# Cost check before every call
all_msgs = ([{"role": "system", "content": system}] if system else []) + messages
estimate = estimate_cost(all_msgs, self._model, max_tokens)
guard_cost(estimate, self._max_cost)

with log_llm_call(self._model, messages, **metadata) as log:
response = await self._client.messages.create(
model=self._model,
max_tokens=max_tokens,
temperature=temperature,
system=system,
messages=messages,
)
log.input_tokens = response.usage.input_tokens
log.output_tokens = response.usage.output_tokens
log.stop_reason = response.stop_reason
text = response.content[0].text
log.output_preview = text[:200]

return text

Key Takeaways

  • Use anthropic.AsyncAnthropic and openai.AsyncOpenAI inside async code. Calling sync clients from async functions blocks the entire event loop.
  • The system prompt is a top-level parameter in Anthropic's API. In OpenAI's API it is the first message with role: "system".
  • Always implement retry with jittered exponential backoff using tenacity. Read the Retry-After header from 429 responses and respect it.
  • Use a token bucket for proactive rate limiting before sending requests, not just retry after hitting 429.
  • Estimate token cost before every call. Add cost guard rails. Log actual cost with every response.
  • Distinguish error types: authentication and permission errors are permanent (fix the code or config). Rate limit, connection, and server errors are transient (retry).
  • JSON mode (OpenAI) and strict structured outputs guarantee schema compliance. For Claude, use XML tags for reliable structured extraction.
  • Log every LLM call as structured JSON with model, tokens, latency, cost, and error. Build dashboards from these logs.

Practice Problems

Problem 1: Implement an async client that fires the same prompt at both Claude and GPT-4o simultaneously and returns whichever response arrives first. Cancel the slower request. Handle the case where one provider is entirely unavailable.

Problem 2: Write a CostLedger class that tracks cumulative API spend per user per day. Raise an exception when a user exceeds $5.00 of daily spend. The ledger must be async-safe for concurrent requests from the same user.

Problem 3: The following code silently drops errors. Rewrite it to distinguish between retryable and non-retryable errors, implement retry for retryable errors, and raise immediately for non-retryable ones:

def get_summary(text: str) -> str:
try:
client = openai.OpenAI()
response = client.chat.completions.create(
model="gpt-4o",
messages=[{"role": "user", "content": f"Summarize: {text}"}],
)
return response.choices[0].message.content
except Exception:
return ""

Problem 4: Implement a MultiModelFallback client that tries claude-opus-4-5 first, falls back to gpt-4o on any InternalServerError, and falls back to claude-haiku-3-5 if both fail. Log which provider was ultimately used for each request.

Problem 5: The batch processor in Part 6 creates all tasks immediately using asyncio.gather. Rewrite it using asyncio.Queue as a work queue so that new tasks are only created when a worker slot becomes available. This prevents memory issues with very large batches (100,000+ documents).

© 2026 EngineersOfAI. All rights reserved.